package com.yomahub.liteflow.flow.parallel.strategy;

import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.LongAdder;

/**
 * 完成指定阈值任务
 *
 * @author luo yi
 * @since 2.13.4
 */
public class PercentageOfParallelExecutor extends ParallelStrategyExecutor {

    @Override
    public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {

        // 获取所有 CompletableFuture 任务
        List<CompletableFuture<WhenFutureObj>> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex);

        int total = whenAllTaskList.size();

        // 计算阈值数量（向上取整），为 0 时取 1，表示只等待一个完成，即 any
        int thresholdCount = Math.max(1, (int) Math.ceil(total * whenCondition.getPercentage()));

        // 已完成任务收集器
        ConcurrentLinkedQueue<CompletableFuture<WhenFutureObj>> completedFutures = new ConcurrentLinkedQueue<>();

        // 阈值触发门闩
        CompletableFuture<Void> thresholdFuture = new CompletableFuture<>();

        // 原子计数器
        LongAdder completedCount = new LongAdder();

        // 为每个任务添加回调
        whenAllTaskList.forEach(future ->
                future.whenComplete((result, ex) -> {
                    // 计数 +1
                    completedCount.increment();

                    int currentCount = completedCount.intValue();

                    if (currentCount <= thresholdCount) {
                        // 添加已完成任务
                        completedFutures.add(future);
                    }

                    // 达到阈值时触发门闩（确保只触发一次）
                    if (currentCount >= thresholdCount && !thresholdFuture.isDone()) {
                        thresholdFuture.complete(null);
                    }
                })
        );

        // 创建组合任务（仅包含已完成任务）
        CompletableFuture<Void> combinedTask = thresholdFuture.thenRun(() -> {
            // 达到阈值时创建 allOf 任务
            CompletableFuture.allOf(completedFutures.toArray(new CompletableFuture[]{})).join();
        });

        // 处理结果（会阻塞直到阈值任务完成）
        this.handleTaskResult(whenCondition, slotIndex, whenAllTaskList, combinedTask);

    }

}
